/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.solr;



import com.google.auto.value.AutoValue;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.transforms.*;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.util.BackOff;

import com.bff.gaia.unified.sdk.util.BackOffUtils;

import com.bff.gaia.unified.sdk.util.FluentBackoff;

import com.bff.gaia.unified.sdk.util.Sleeper;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PDone;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableSet;

import com.bff.gaia.unified.sdk.transforms.Create;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.sdk.transforms.Reshuffle;

import org.apache.http.client.HttpClient;

import org.apache.solr.client.solrj.SolrQuery;

import org.apache.solr.client.solrj.SolrServerException;

import org.apache.solr.client.solrj.impl.CloudSolrClient;

import org.apache.solr.client.solrj.impl.HttpClientUtil;

import org.apache.solr.client.solrj.impl.HttpSolrClient;

import org.apache.solr.client.solrj.request.UpdateRequest;

import org.apache.solr.client.solrj.request.schema.SchemaRequest;

import org.apache.solr.client.solrj.response.QueryResponse;

import org.apache.solr.client.solrj.response.schema.SchemaResponse;

import org.apache.solr.common.SolrDocument;

import org.apache.solr.common.SolrException;

import org.apache.solr.common.SolrInputDocument;

import org.apache.solr.common.cloud.*;

import org.apache.solr.common.params.CursorMarkParams;

import org.apache.solr.common.params.ModifiableSolrParams;

import org.joda.time.Duration;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import javax.annotation.Nullable;

import java.io.IOException;

import java.io.Serializable;

import java.util.ArrayList;

import java.util.Collection;

import java.util.Collections;

import java.util.function.Predicate;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.*;



/**

 * Transforms for reading and writing data from/to Solr.

 *

 * <h3>Reading from Solr</h3>

 *

 * <p>{@link SolrIO#read SolrIO.read()} returns a bounded {@link PCollection

 * PCollection&lt;SolrDocument&gt;} representing Solr documents.

 *

 * <p>To configure the {@link SolrIO#read}, you have to provide a connection configuration

 * containing the Zookeeper address of the Solr cluster, and the collection name. The following

 * example illustrates options for configuring the source:

 *

 * <pre>{@code

 * SolrIO.ConnectionConfiguration conn = SolrIO.ConnectionConfiguration.create("127.0.0.1:9983");

 * // Optionally: .withBasicCredentials(username, password)

 *

 * PCollection<SolrDocument> docs = p.apply(

 *     SolrIO.read().from("my-collection").withConnectionConfiguration(conn));

 *

 * }</pre>

 *

 * <p>You can specify a query on the {@code read()} using {@code withQuery()}.

 *

 * <h3>Writing to Solr</h3>

 *

 * <p>To write documents to Solr, use {@link SolrIO#write SolrIO.write()}, which writes Solr

 * documents from a {@link PCollection PCollection&lt;SolrInputDocument&gt;} (which can be bounded

 * or unbounded).

 *

 * <p>To configure {@link SolrIO#write SolrIO.write()}, similar to the read, you have to provide a

 * connection configuration, and a collection name. For instance:

 *

 * <pre>{@code

 * PCollection<SolrInputDocument> inputDocs = ...;

 * inputDocs.apply(SolrIO.write().to("my-collection").withConnectionConfiguration(conn));

 *

 * }</pre>

 *

 * <p>When writing it is possible to customize the retry behavior if an error is encountered. By

 * default this is disabled and only one attempt will be made.

 */

@Experimental(Experimental.Kind.SOURCE_SINK)

public class SolrIO {



  private static final Logger LOG = LoggerFactory.getLogger(SolrIO.class);



  public static Read read() {

    // 1000 for batch size is good enough in many cases,

    // ex: if document size is large, around 10KB, the response's size will be around 10MB

    // if document seize is small, around 1KB, the response's size will be around 1MB

    return new AutoValue_SolrIO_Read.Builder().setBatchSize(1000).setQuery("*:*").build();

  }



  public static Write write() {

    // 1000 for batch size is good enough in many cases,

    // ex: if document size is large, around 10KB, the request's size will be around 10MB

    // if document size is small, around 1KB, the request's size will be around 1MB

    return new AutoValue_SolrIO_Write.Builder().setMaxBatchSize(1000).build();

  }



  private SolrIO() {}



  /** A POJO describing a connection configuration to Solr. */

  @AutoValue

  public abstract static class ConnectionConfiguration implements Serializable {



    abstract String getZkHost();



    @Nullable

    abstract String getUsername();



    @Nullable

    abstract String getPassword();



    abstract Builder builder();



    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setZkHost(String zkHost);



      abstract Builder setUsername(String username);



      abstract Builder setPassword(String password);



      abstract ConnectionConfiguration build();

    }



    /**

     * Creates a new Solr connection configuration.

     *

     * @param zkHost host of zookeeper

     * @return the connection configuration object

     */

    public static ConnectionConfiguration create(String zkHost) {

      checkArgument(zkHost != null, "zkHost can not be null");

      return new AutoValue_SolrIO_ConnectionConfiguration.Builder().setZkHost(zkHost).build();

    }



    /** If Solr basic authentication is enabled, provide the username and password. */

    public ConnectionConfiguration withBasicCredentials(String username, String password) {

      checkArgument(username != null, "username can not be null");

      checkArgument(!username.isEmpty(), "username can not be empty");

      checkArgument(password != null, "password can not be null");

      checkArgument(!password.isEmpty(), "password can not be empty");

      return builder().setUsername(username).setPassword(password).build();

    }



    private void populateDisplayData(DisplayData.Builder builder) {

      builder.add(DisplayData.item("zkHost", getZkHost()));

      builder.addIfNotNull(DisplayData.item("username", getUsername()));

    }



    private HttpClient createHttpClient() {

      // This is bug in Solr, if we don't create a customize HttpClient,

      // UpdateRequest with commit flag will throw an authentication error.

      ModifiableSolrParams params = new ModifiableSolrParams();

      params.set(HttpClientUtil.PROP_BASIC_AUTH_USER, getUsername());

      params.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, getPassword());

      return HttpClientUtil.createClient(params);

    }



    AuthorizedSolrClient<CloudSolrClient> createClient() {

      CloudSolrClient solrClient = new CloudSolrClient(getZkHost(), createHttpClient());

      return new AuthorizedSolrClient<>(solrClient, this);

    }



    AuthorizedSolrClient<HttpSolrClient> createClient(String shardUrl) {

      HttpSolrClient solrClient = new HttpSolrClient(shardUrl, createHttpClient());

      return new AuthorizedSolrClient<>(solrClient, this);

    }

  }



  /**

   * A POJO encapsulating a configuration for retry behavior when issuing requests to Solr. A retry

   * will be attempted until the maxAttempts or maxDuration is exceeded, whichever comes first, for

   * any of the following exceptions:

   *

   * <ul>

   *   <li>{@link IOException}

   *   <li>{@link SolrServerException}

   *   <li>{@link SolrException} where the {@link SolrException.ErrorCode} is one of:

   *       <ul>

   *         <li>{@link SolrException.ErrorCode#CONFLICT}

   *         <li>{@link SolrException.ErrorCode#SERVER_ERROR}

   *         <li>{@link SolrException.ErrorCode#SERVICE_UNAVAILABLE}

   *         <li>{@link SolrException.ErrorCode#INVALID_STATE}

   *         <li>{@link SolrException.ErrorCode#UNKNOWN}

   *       </ul>

   * </ul>

   */

  @AutoValue

  public abstract static class RetryConfiguration implements Serializable {

    @VisibleForTesting

    static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();



    abstract int getMaxAttempts();



    abstract Duration getMaxDuration();



    abstract RetryPredicate getRetryPredicate();



    abstract Builder builder();



    @AutoValue.Builder

    abstract static class Builder {

      abstract SolrIO.RetryConfiguration.Builder setMaxAttempts(int maxAttempts);



      abstract SolrIO.RetryConfiguration.Builder setMaxDuration(Duration maxDuration);



      abstract SolrIO.RetryConfiguration.Builder setRetryPredicate(RetryPredicate retryPredicate);



      abstract SolrIO.RetryConfiguration build();

    }



    public static RetryConfiguration create(int maxAttempts, Duration maxDuration) {

      checkArgument(maxAttempts > 0, "maxAttempts must be greater than 0");

      checkArgument(

          maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),

          "maxDuration must be greater than 0");

      return new AutoValue_SolrIO_RetryConfiguration.Builder()

          .setMaxAttempts(maxAttempts)

          .setMaxDuration(maxDuration)

          .setRetryPredicate(DEFAULT_RETRY_PREDICATE)

          .build();

    }



    // Exposed only to allow tests to easily simulate server errors

    @VisibleForTesting

    RetryConfiguration withRetryPredicate(RetryPredicate predicate) {

      checkArgument(predicate != null, "predicate must be provided");

      return builder().setRetryPredicate(predicate).build();

    }



    /**

     * An interface used to control if we retry the Solr call when a {@link Throwable} occurs. If

     * {@link RetryPredicate#test(Object)} returns true, {@link Write} tries to resend the requests

     * to the Solr server if the {@link RetryConfiguration} permits it.

     */

    @FunctionalInterface

    interface RetryPredicate extends Predicate<Throwable>, Serializable {}



    /** This is the default predicate used to test if a failed Solr operation should be retried. */

    private static class DefaultRetryPredicate implements RetryPredicate {

      private static final ImmutableSet<Integer> ELIGIBLE_CODES =

          ImmutableSet.of(

              SolrException.ErrorCode.CONFLICT.code,

              SolrException.ErrorCode.SERVER_ERROR.code,

              SolrException.ErrorCode.SERVICE_UNAVAILABLE.code,

              SolrException.ErrorCode.INVALID_STATE.code,

              SolrException.ErrorCode.UNKNOWN.code);



      @Override

      public boolean test(Throwable t) {

        return (t instanceof IOException

            || t instanceof SolrServerException

            || (t instanceof SolrException && ELIGIBLE_CODES.contains(((SolrException) t).code())));

      }

    }

  }



  /** A {@link PTransform} reading data from Solr. */

  @AutoValue

  public abstract static class Read extends PTransform<PBegin, PCollection<SolrDocument>> {

    private static final long MAX_BATCH_SIZE = 10000L;



    @Nullable

    abstract ConnectionConfiguration getConnectionConfiguration();



    @Nullable

    abstract String getCollection();



    abstract String getQuery();



    abstract int getBatchSize();



    abstract Builder builder();



    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);



      abstract Builder setQuery(String query);



      abstract Builder setBatchSize(int batchSize);



      abstract Builder setCollection(String collection);



      abstract Read build();

    }



    /** Provide the Solr connection configuration object. */

    public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {

      checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");

      return builder().setConnectionConfiguration(connectionConfiguration).build();

    }



    /**

     * Provide name of collection while reading from Solr.

     *

     * @param collection the collection toward which the requests will be issued

     */

    public Read from(String collection) {

      checkArgument(collection != null, "collection can not be null");

      return builder().setCollection(collection).build();

    }



    /**

     * Provide a query used while reading from Solr.

     *

     * @param query the query. See <a

     *     href="https://cwiki.apache.org/confluence/display/solr/The+Standard+Query+Parser">Solr

     *     Query </a>

     */

    public Read withQuery(String query) {

      checkArgument(query != null, "query can not be null");

      checkArgument(!query.isEmpty(), "query can not be empty");

      return builder().setQuery(query).build();

    }



    /**

     * Provide a size for the cursor read. See <a

     * href="https://cwiki.apache.org/confluence/display/solr/Pagination+of+Results">cursor API</a>

     * Default is 100. Maximum is 10 000. If documents are small, increasing batch size might

     * improve read performance. If documents are big, you might need to decrease batchSize

     *

     * @param batchSize number of documents read in each scroll read

     */

    @VisibleForTesting

    Read withBatchSize(int batchSize) {

      // TODO remove this configuration, we can figure out the best number

      // by tuning batchSize when pipelines run.

      checkArgument(

          batchSize > 0 && batchSize < MAX_BATCH_SIZE,

          "Valid values for batchSize are 1 (inclusize) to %s (exclusive), but was: %s ",

          MAX_BATCH_SIZE,

          batchSize);

      return builder().setBatchSize(batchSize).build();

    }



    @Override

    public PCollection<SolrDocument> expand(PBegin input) {

      checkArgument(

          getConnectionConfiguration() != null, "withConnectionConfiguration() is required");

      checkArgument(getCollection() != null, "from() is required");



      return input

          .apply("Create", Create.of(this))

          .apply("Split", ParDo.of(new SplitFn()))

          .apply("Reshuffle", Reshuffle.viaRandomKey())

          .apply("Read", ParDo.of(new ReadFn()));

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      builder.addIfNotNull(DisplayData.item("query", getQuery()));

      getConnectionConfiguration().populateDisplayData(builder);

    }

  }



  /** A POJO describing a replica of Solr. */

  @AutoValue

  abstract static class ReplicaInfo implements Serializable {

    public abstract String coreName();



    public abstract String coreUrl();



    public abstract String baseUrl();



    static ReplicaInfo create(Replica replica) {

      return new AutoValue_SolrIO_ReplicaInfo(

          replica.getStr(ZkStateReader.CORE_NAME_PROP),

          replica.getCoreUrl(),

          replica.getStr(ZkStateReader.BASE_URL_PROP));

    }

  }



  static class SplitFn extends DoFn<Read, KV<Read, ReplicaInfo>> {

    @ProcessElement

    public void process(@Element SolrIO.Read spec, OutputReceiver<KV<Read, ReplicaInfo>> out)

        throws IOException {

      ConnectionConfiguration connectionConfig = spec.getConnectionConfiguration();

      try (AuthorizedSolrClient<CloudSolrClient> client = connectionConfig.createClient()) {

        String collection = spec.getCollection();

        final ClusterState clusterState = AuthorizedSolrClient.getClusterState(client);

        DocCollection docCollection = clusterState.getCollection(collection);

        for (Slice slice : docCollection.getSlices()) {

          ArrayList<Replica> replicas = new ArrayList<>(slice.getReplicas());

          Collections.shuffle(replicas);

          // Load balancing by randomly picking an active replica

          Replica randomActiveReplica = null;

          for (Replica replica : replicas) {

            // We need to check both state of the replica and live nodes

            // to make sure that the replica is alive

            if (replica.getState() == Replica.State.ACTIVE

                && clusterState.getLiveNodes().contains(replica.getNodeName())) {

              randomActiveReplica = replica;

              break;

            }

          }

          // TODO in case of this replica goes inactive while the pipeline runs.

          // We should pick another active replica of this shard.

          checkState(

              randomActiveReplica != null,

              "Can not found an active replica for slice %s",

              slice.getName());

          out.output(KV.of(spec, ReplicaInfo.create(checkNotNull(randomActiveReplica))));

        }

      }

    }

  }



  static class ReadFn extends DoFn<KV<Read, ReplicaInfo>, SolrDocument> {

    @ProcessElement

    public void process(

		@Element KV<Read, ReplicaInfo> specAndReplica, OutputReceiver<SolrDocument> out)

        throws IOException {

      Read spec = specAndReplica.getKey();

      ReplicaInfo replica = specAndReplica.getValue();

      String cursorMark = CursorMarkParams.CURSOR_MARK_START;

      String query = spec.getQuery();

      if (query == null) {

        query = "*:*";

      }

      SolrQuery solrQuery = new SolrQuery(query);

      solrQuery.setRows(spec.getBatchSize());

      solrQuery.setDistrib(false);

      try (AuthorizedSolrClient<HttpSolrClient> client =

          spec.getConnectionConfiguration().createClient(replica.baseUrl())) {

        SchemaRequest.UniqueKey request = new SchemaRequest.UniqueKey();

        try {

          SchemaResponse.UniqueKeyResponse response = client.process(spec.getCollection(), request);

          solrQuery.addSort(response.getUniqueKey(), SolrQuery.ORDER.asc);

        } catch (SolrServerException e) {

          throw new IOException("Can not get unique key from solr", e);

        }



        while (true) {

          solrQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);

          try {

            QueryResponse response;

            response = client.query(replica.coreName(), solrQuery);

            if (cursorMark.equals(response.getNextCursorMark())) {

              break;

            }

            cursorMark = response.getNextCursorMark();

            for (SolrDocument doc : response.getResults()) {

              out.output(doc);

            }

          } catch (SolrServerException e) {

            throw new IOException(e);

          }

        }

      }

    }

  }



  /** A {@link PTransform} writing data to Solr. */

  @AutoValue

  public abstract static class Write extends PTransform<PCollection<SolrInputDocument>, PDone> {

    @Nullable

    abstract ConnectionConfiguration getConnectionConfiguration();



    @Nullable

    abstract String getCollection();



    abstract int getMaxBatchSize();



    abstract Builder builder();



    @Nullable

    abstract RetryConfiguration getRetryConfiguration();



    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);



      abstract Builder setCollection(String collection);



      abstract Builder setMaxBatchSize(int maxBatchSize);



      abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration);



      abstract Write build();

    }



    /** Provide the Solr connection configuration object. */

    public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {

      checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");

      return builder().setConnectionConfiguration(connectionConfiguration).build();

    }



    /**

     * Provide name of collection while reading from Solr.

     *

     * @param collection the collection toward which the requests will be issued

     */

    public Write to(String collection) {

      checkArgument(collection != null, "collection can not be null");

      return builder().setCollection(collection).build();

    }



    /**

     * Provide a maximum size in number of documents for the batch. Depending on the execution

     * engine, size of bundles may vary, this sets the maximum size. Change this if you need to have

     * smaller batch. Default max batch size is 1000.

     *

     * @param batchSize maximum batch size in number of documents

     */

    public Write withMaxBatchSize(int batchSize) {

      checkArgument(batchSize > 0, "batchSize must be larger than 0, but was: %s", batchSize);

      return builder().setMaxBatchSize(batchSize).build();

    }



    /**

     * Provides configuration to retry a failed batch call to Solr. A batch is considered as failed

     * if the underlying {@link CloudSolrClient} surfaces {@link

     * org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException}, {@link

     * SolrServerException} or {@link IOException}. Users should consider that retrying might

     * compound the underlying problem which caused the initial failure. Users should also be aware

     * that once retrying is exhausted the error is surfaced to the runner which <em>may</em> then

     * opt to retry the current partition in entirety or abort if the max number of retries of the

     * runner is completed. Retrying uses an exponential backoff algorithm, with minimum backoff of

     * 5 seconds and then surfacing the error once the maximum number of retries or maximum

     * configuration duration is exceeded.

     *

     * <p>Example use:

     *

     * <pre>{@code

     * SolrIO.write()

     *   .withRetryConfiguration(SolrIO.RetryConfiguration.create(10, Duration.standardMinutes(3))

     *   ...

     * }</pre>

     *

     * @param retryConfiguration the rules which govern the retry behavior

     * @return the {@link Write} with retrying configured

     */

    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) {

      checkArgument(retryConfiguration != null, "retryConfiguration is required");

      return builder().setRetryConfiguration(retryConfiguration).build();

    }



    @Override

    public PDone expand(PCollection<SolrInputDocument> input) {

      checkState(getConnectionConfiguration() != null, "withConnectionConfiguration() is required");

      checkState(getCollection() != null, "to() is required");



      input.apply(ParDo.of(new WriteFn(this)));

      return PDone.in(input.getPipeline());

    }



    @VisibleForTesting

    static class WriteFn extends DoFn<SolrInputDocument, Void> {

      @VisibleForTesting

      static final String RETRY_ATTEMPT_LOG = "Error writing to Solr. Retry attempt[%d]";



      private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds(5);



      private transient FluentBackoff retryBackoff; // defaults to no retrying

      private final Write spec;

      private transient AuthorizedSolrClient solrClient;

      private Collection<SolrInputDocument> batch;



      WriteFn(Write spec) {

        this.spec = spec;

      }



      @Setup

      public void setup() throws Exception {

        solrClient = spec.getConnectionConfiguration().createClient();



        retryBackoff =

            FluentBackoff.DEFAULT

                .withMaxRetries(0) // default to no retrying

                .withInitialBackoff(RETRY_INITIAL_BACKOFF);



        if (spec.getRetryConfiguration() != null) {

          // FluentBackoff counts retries excluding the original while we count attempts

          // to remove ambiguity (hence the -1)

          retryBackoff =

              retryBackoff

                  .withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1)

                  .withMaxCumulativeBackoff(spec.getRetryConfiguration().getMaxDuration());

        }

      }



      @StartBundle

      public void startBundle(DoFn.StartBundleContext context) {

        batch = new ArrayList<>();

      }



      @ProcessElement

      public void processElement(ProcessContext context) throws Exception {

        SolrInputDocument document = context.element();

        batch.add(document);

        if (batch.size() >= spec.getMaxBatchSize()) {

          flushBatch();

        }

      }



      @FinishBundle

      public void finishBundle(DoFn.FinishBundleContext context) throws Exception {

        flushBatch();

      }



      // Flushes the batch, implementing the retry mechanism as configured in the spec.

      private void flushBatch() throws IOException, InterruptedException {

        if (batch.isEmpty()) {

          return;

        }

        try {

          UpdateRequest updateRequest = new UpdateRequest();

          updateRequest.add(batch);



          Sleeper sleeper = Sleeper.DEFAULT;

          BackOff backoff = retryBackoff.backoff();

          int attempt = 0;

          while (true) {

            attempt++;

            try {

              solrClient.process(spec.getCollection(), updateRequest);

              break;

            } catch (Exception exception) {



              // fail immediately if no retry configuration doesn't handle this

              if (spec.getRetryConfiguration() == null

                  || !spec.getRetryConfiguration().getRetryPredicate().test(exception)) {

                throw new IOException(

                    "Error writing to Solr (no attempt made to retry)", exception);

              }



              // see if we can pause and try again

              if (!BackOffUtils.next(sleeper, backoff)) {

                throw new IOException(

                    String.format(

                        "Error writing to Solr after %d attempt(s). No more attempts allowed",

                        attempt),

                    exception);



              } else {

                // Note: this used in test cases to verify behavior

                LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), exception);

              }

            }

          }

        } finally {

          batch.clear();

        }

      }



      @Teardown

      public void closeClient() throws IOException {

        if (solrClient != null) {

          solrClient.close();

        }

      }

    }

  }

}